package com.pdd;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Arrays;
import java.util.Properties;

@SpringBootApplication
@Slf4j
public class RabbitmqApplication {

    public static void main(String[] args) {
//        SpringApplication.run(RabbitmqApplication.class,args);

        Serde<String> keyType = Serdes.String();
        Properties properties=new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"kafka-stream-pdd-wordcount");
        properties.put(StreamsConfig.CLIENT_ID_CONFIG,"kafka-stream-pdd-client");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"172.21.12.1:9092,172.21.12.1:9093,172.21.12.1:9094");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keyType.getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, keyType.getClass().getName());
        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,10*1000);
        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,0);

        StreamsBuilder streamsBuilder=new StreamsBuilder();
        KStream<String, String> pdd = streamsBuilder.stream("pdd");

        KTable<String, Long> wordCounts = pdd.flatMapValues(textLine -> Arrays.asList(textLine.split("\\W+")))
        .groupBy((key, word) -> {
            System.out.println(key+"\t"+word);
            return word;
        }).count();

        wordCounts.toStream().to("pdd-out", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams kafkaStreams=new KafkaStreams(streamsBuilder.build(),properties);

        kafkaStreams.start();

    }
}